package com.atguigu.flink.datastreamapi.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/13
 */
public class Demo3_FileSink
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //每间隔5s，做一次备份。开启备份后，文件才会闭合(从隐藏文件变为正式文件，去掉 .inprogerss)
        env.enableCheckpointing(5000);
        env.setParallelism(2);

        /*
            构造FileSink
                forRowFormat(): 将数据以行式存储的格式写出。常用。 json,csv
                    forRowFormat(
            final Path basePath,  写出的父目录
             final Encoder<IN> encoder ，写出的IN类型的编码器
             )
                forBulkFormat(): 将数据以列式存储的格式写出。 orc,parquet
         */
        FileSink fileSink = FileSink
            .<String>forRowFormat(new Path("E:\\tmp"), new SimpleStringEncoder())
            //在basePath下分目录存储。常用按照日期分目录存储。 :无法在windows下作为文件名
            .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd HH-mm"))
            //文件的前后缀
            .withOutputFileConfig(new OutputFileConfig("atguigu", ".log"))
            /*
                文件的滚动策略
                    一般使用DefaultRollingPolicy
                        文件大小超过128M时会滚动。
                        每间隔60s(从写入开始)滚动一次
                        文件超过60s没有再被写入新的内容，滚动一次

             */
            .withRollingPolicy(
                DefaultRollingPolicy.builder()
                                    .withMaxPartSize(MemorySize.ofMebiBytes(1l))
                                    .withRolloverInterval(Duration.ofSeconds(30l))
                                    .build()
            )
            //每间隔一段时间判断文件是否要闭合。检查的间隔时间 <= 滚动的时间
            .withBucketCheckInterval(30 * 1000l)
            .build();


        DataGeneratorSource<WaterSensor> source = new DataGeneratorSource<>(
            new GeneratorFunction<Long, WaterSensor>()
            {
                @Override
                public WaterSensor map(Long value) throws Exception {
                    return new WaterSensor(
                        "s" + RandomUtils.nextInt(1, 11),
                        System.currentTimeMillis(),
                        RandomUtils.nextInt(10, 1001)
                    );
                }
            },
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(2000d),
            TypeInformation.of(WaterSensor.class)
        );

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(),"datagen")
            .map(JSON::toJSONString)
            .sinkTo(fileSink);

        env.execute();

    }
}
